package com.lianda.example.hot.sql;

import com.lianda.example.hot.HotItems;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Properties;

public class TableSqlTestUtil {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "user_action";

    //写数据到kafka
    public static void writeToKafka() throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("group.id", "sql_test");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        URL fileUrl = TableSqlTestUtil.class.getClassLoader().getResource("data/sql.csv");
        BufferedReader file = new BufferedReader(new InputStreamReader(new FileInputStream(fileUrl.getFile()), "UTF-8"));
        String action;

        while ((action = file.readLine()) != null) {
            ProducerRecord record = new ProducerRecord<String, String>
                    (topic, null, null, action);
            producer.send(record);
            System.out.println("发送数据: " + action);
            Thread.sleep(1000);
        }
        System.out.println("========完成=========");
    }

    public static void main(String[] args)  throws Exception {
        writeToKafka();
    }
}
